初始化
@Override
public void afterPropertiesSet() throws Exception {
// 注意:basicConfig需要首先配置,因为其他可能会依赖于basicConfig的配置
checkAndConfigBasicConfig();
checkAndConfigExport();
checkAndConfigRegistry();
}
配置部分
//checkAndConfigBasicConfig();
<motan:basicService export="demoMotan:8002"
group="motan-demo-rpc" accessLog="false" shareChannel="true" module="motan-demo-rpc"
application="myMotanDemo" registry="registry" id="serviceBasicConfig"/>
//checkAndConfigExport();
<!-- 协议配置。为防止多个业务配置冲突,推荐使用id表示具体协议。-->
<motan:protocol id="demoMotan" default="true" name="motan"
maxServerConnection="80000" maxContentLength="1048576"
maxWorkerThread="800" minWorkerThread="20"/>
//checkAndConfigRegistry();
<motan:registry regProtocol="zookeeper" name="registry" address="127.0.0.1:2181" connectTimeout="2000"/>
执行export()
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!getExported().get()) {//如果没有暴露过
export();
}
}
doExport();
doExport(protocolConfig, port, registryUrls);
exporters.add(configHandler.export(interfaceClass, ref, urls));//关键
//interfaceClass : interface com.weibo.motan.demo.service.MotanDemoService
//ref : com.weibo.motan.demo.server.MotanDemoServiceImpl@6986852
//urls 是路由数据 (关键参数)


SimpleConfigHandler 中:
@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {
String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
//motan://10.226.110.67:8001/com.weibo.motan.demo.service.MotanDemoService?maxContentLength=1048576&module=motan-demo-rpc&
//nodeType=service&accessLog=false&minWorkerThread=20&protocol=motan&isDefault=true&application=myMotanDemo&
//maxWorkerThread=800&shareChannel=true&refreshTimestamp=1487644852350
//&id=com.weibo.api.motan.config.springsupport.ServiceConfigBean&export=demoMotan:8001&maxServerConnection=80000&group=motan-demo-rpc&
URL serviceUrl = URL.valueOf(serviceStr);
String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());//motan
Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));
Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);
Exporter<T> exporter = protocol.export(provider, serviceUrl);//协议负责打包
register(registryUrls, serviceUrl);//这边就是将服务注册到 注册器中 :这边用的是zookeeper (注意zk的实现)
return exporter;
}




@SuppressWarnings("unchecked")
@Override
public <T> Exporter<T> export(Provider<T> provider, URL url) {
if (url == null) {
throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: url is null",
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
if (provider == null) {
throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: provider is null, url=" + url,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
//
synchronized (exporterMap) {
Exporter<T> exporter = (Exporter<T>) exporterMap.get(protocolKey);
if (exporter != null) {//先判断存在性: protocolKey 和 exporter 一一对应
throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: service already exist, url=" + url,
MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
}
exporter = createExporter(provider, url);// 通过provider 和 url 创建 exporter 并且创建一个通讯服务端(调用netty的实现)
exporter.init();//初始化 这边包括初始化 netty的服务 open();
exporterMap.put(protocolKey, exporter);
LoggerUtil.info(this.getClass().getSimpleName() + " export Success: url=" + url);
return exporter;
}
}
DefaultRpcProtocol 的 DefaultRpcExporter的调用:
public DefaultRpcExporter(Provider<T> provider, URL url) {
//构造方法 返回出去的是 provider
super(provider, url);
ProviderMessageRouter requestRouter = initRequestRouter(url);
endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).
getExtension(url.getParameter(URLParamType.endpointFactory.getName(),
URLParamType.endpointFactory.getValue()));
server = endpointFactory.createServer(url, requestRouter);
}

AbstractEndpointFactory中:
@Override
public Server createServer(URL url, MessageHandler messageHandler) {
HeartbeatFactory heartbeatFactory = getHeartbeatFactory(url);
messageHandler = heartbeatFactory.wrapMessageHandler(messageHandler);
synchronized (ipPort2ServerShareChannel) {
String ipPort = url.getServerPortStr();
String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
boolean shareChannel =
url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());
if (!shareChannel) { // 独享一个端口
LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);
// 如果端口已经被使用了,使用该server bind 会有异常
return innerCreateServer(url, messageHandler);
}
LoggerUtil.info(this.getClass().getSimpleName() + " create share_channel server: url={}", url);
Server server = ipPort2ServerShareChannel.get(ipPort);
if (server != null) {
// can't share service channel
if (!MotanFrameworkUtil.checkIfCanShallServiceChannel(server.getUrl(), url)) {
throw new MotanFrameworkException(
"Service export Error: share channel but some config param is different, protocol or codec or serialize or maxContentLength or maxServerConnection or maxWorkerThread or heartbeatFactory, source="
+ server.getUrl() + " target=" + url, MotanErrorMsgConstant.FRAMEWORK_EXPORT_ERROR);
}
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
return server;
}
url = url.createCopy();
url.setPath(""); //共享server端口,由于有多个interfaces存在,所以把path设置为空
server = innerCreateServer(url, messageHandler);//创建的是一个 netty的服务端
ipPort2ServerShareChannel.put(ipPort, server);
saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);
return server;
}
}
afterExport();
在集合中添加服务;
以上是一个大致的流程:建立 netty 的传输服务 ,将 暴露的服务 注册到 注册器中。